跳到主要内容

Go Channel 使用场景记录

因为学习 Java 时数据交流基本上是通过共享内存来实现多线程的通信的,阻塞队列使用较少,所以专门记录下 Go 中的 Channel 使用场景

Golang 中的 channel 主要用于以下几个场景:

  1. 并发编程:channel 是 Golang 中用于协程(goroutine)间通信的主要机制。它可以实现并发的协程之间的数据传输和同步,用于协程之间的消息传递、共享数据和事件通知。
  2. 同步和互斥:channel 可以用于同步协程的执行顺序,确保在特定条件满足之前,协程会一直等待。通过 channel,可以实现互斥锁和条件变量等同步原语,用于保护共享资源的并发访问。
  3. 事件驱动编程:channel 可以用于实现事件驱动的编程模型。协程可以等待特定的事件发生,而不需要进行主动的轮询操作。当事件发生时,通过将事件数据发送到相应的 channel,触发等待的协程进行处理。
  4. 缓冲和流控制:channel 可以具有缓冲区,允许一定数量的元素在发送和接收之间进行缓冲。这种机制可以用于平衡生产者和消费者之间的速度差异,提高并发性能。

channel 特性

操作值为 nil 的 channel被关闭的 channel正常的 channel
closepanicpanic成功关闭
c<-永远阻塞panic阻塞或成功发送
<-c永远阻塞永远不阻塞阻塞或成功接收

使用 range 快速读取内容

这个主要是介绍如何快速的读取通道内的数据

package main

import "fmt"

func main() {
c := make(chan int, 20)
go func() {
for i := 0; i < 10; i++ {
c <- i
}
close(c)
}()

// 当 c 被关闭后,取完里面的元素就会跳出循环
for x := range c {
fmt.Println(x)
}
}

for ... range c { do } 这种写法相当于 if _, ok := <-c; ok { do }

非阻塞的 select

select 本身是阻塞的,当所有分支都不满足就会一直阻塞,如果想不阻塞,那么一个什么都不干的 default 分支是最好的选择

select {
case <-done:
return
default:
}

如果加入了 default 分支,那么无论涉及通道操作的表达式是否有阻塞,select 语句都不会被阻塞。如果那几个表达式被阻塞了,或者说都没有满足求值的条件,那么默认分支就会被选中并执行。

for {select{}} 的终止

尽量不要用 break label 形式,而是把终止循环的条件放到 for 条件里来实现

for ok {
select {
case ch <- 0:
case <-done:
ok = false
}
}

事件驱动-检查超时

package main

import "time"

// 利用 time.After 实现
func main() {
done := do()
select {
case <-done:
// logic
case <-time.After(3 * time.Second):
// timeout
}
}

func do() <-chan struct{} {
done := make(chan struct{}, 1)
go func() {
// do something
// ...
done <- struct{}{}
}()

return done
}

这里主要利用了 select 的随机性,哪个 channel 先执行完先调用哪块 case,下面这个 chan struct{} 就是一个无意义的 channel,它唯一的作用就是联系上 select 监听。

之所以这里使用空结构体是因为空结构体不占据内存空间,因此被广泛作为各种场景下的占位符使用。一是节省资源,二是空结构体本身就具备很强的语义,即这里不需要任何值,仅作为占位符。

注意,这里要设置管道为 1,否则对于非缓存通道,无论发送操作还是接受操作一开始就是阻塞的,只有配对的操作出现才会开始执行。所以当接收方或者发送方一方没了都会造成阻塞

并发编程-取最快的结果

比较常见的一个场景是重试,第一个请求在指定超时时间内没有返回结果,这时重试第二次,取两次中最快返回的结果使用。

package main

import "fmt"

func main() {
ret := make(chan string, 3)
for i := 0; i < cap(ret); i++ {
go call(ret)
}
fmt.Println(<-ret)
}

func call(ret chan<- string) {
// do something
// ...
ret <- "result"
}

打印:

result

这里只会返回一个结果

并发编程-限制最大并发数

package main

func main() {
// 最大并发数为 2
limits := make(chan struct{}, 2)
for i := 0; i < 10; i++ {
go func() {
// 缓冲区满了就会阻塞在这
limits <- struct{}{}
do()
<-limits // 执行完了一个任务再放行队列
}()
}
}

func do() {
// do something
}

并发编程-同步多个 goroutine

利用 close 广播

package main

import "fmt"

func main() {
c := make(chan struct{})
for i := 0; i < 5; i++ {
go do(c)
}
close(c)

select {} // 阻塞
}

func do(c <-chan struct{}) {
// 会阻塞直到收到 close
<-c
fmt.Println("hello")
}

其实这个等价于 sync.WaitGroup 包的用法,如下

package main

import (
"fmt"
"sync"
)

var wg sync.WaitGroup

func main() {
wg.Add(5)
for i := 0; i < 5; i++ {
go do()
wg.Done()
}
select {} // 阻塞
}

func do() {
wg.Wait()
fmt.Println("hello")
}

同步和互斥-实现管道 Pipeline

Channels 也可以用于将多个 goroutine 连接在一起,一个 Channel 的输出作为下一个 Channel 的输入。这种串联的 Channels 就是所谓的管道(pipeline)。

下面的程序用两个 channels 将三个 goroutine 串联起来,如图8.1所示。

第一个 goroutine 是一个计数器,用于生成0、1、2、……形式的整数序列,然后通过channel将该整数序列发送给第二个goroutine;

第二个 goroutine 是一个求平方的程序,对收到的每个整数求平方,然后将平方后的结果通过第二个 channel 发送给第三个 goroutine;

第三个 goroutine 是一个打印程序,打印收到的每个整数。

// 下面这段程序会一直打印管道的内容

func main() {
naturals := make(chan int)
squares := make(chan int)

// Counter
go func() {
for x := 0; ; x++ {
naturals <- x
}
}()

// Squarer
go func() {
for {
x := <-naturals
squares <- x * x
}
}()

// Printer (in main goroutine)
for {
fmt.Println(<-squares)
}
}

上面这段并没有退出,因为 main goroutine 一直在等待 squares 的值,而 squares 一直在等待 naturals 的值,而 naturals 一直在等待 main goroutine 的值,这样就形成了一个死锁。

可以改成下面这样,使用 range 关键字,当 naturals 关闭的时候,循环就会结束,这样就不会造成死锁了。

func main() {
naturals := make(chan int)
squares := make(chan int)

// Counter
go func() {
for x := 0; x < 100; x++ {
naturals <- x
}
close(naturals)
}()

// Squarer
go func() {
for x := range naturals {
squares <- x * x
}
close(squares)
}()

// Printer (in main goroutine)
for x := range squares {
fmt.Println(x)
}
}

Reference